#ifndef XG_RABITTMQCONNECT_H
#define XG_RABITTMQCONNECT_H
////////////////////////////////////////////////////////////////
#include "../stdx/std.h"

#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>

class RabbitmqConnect
{
protected:
	amqp_socket_t* sock;
	amqp_rpc_reply_t res;
	amqp_connection_state_t conn;
	
public:
	RabbitmqConnect()
	{
		res.reply_type = AMQP_RESPONSE_NORMAL;
		sock = NULL;
	}
	~RabbitmqConnect()
	{
		close();
	}
	bool getResponse()
	{
		res = amqp_get_rpc_reply(conn);

		CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);

		return true;
	}
	int getErrorCode() const
	{
		return res.library_error;
	}
	string getErrorString() const
	{
		switch (res.reply_type)
		{
			case AMQP_RESPONSE_NORMAL: return "SUCCESS";

			case AMQP_RESPONSE_NONE: return "RESPONSE_NONE";

			case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(res.library_error);

			case AMQP_RESPONSE_SERVER_EXCEPTION:
				if (res.reply.id == AMQP_CONNECTION_CLOSE_METHOD || res.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
				{
					auto msg = ((amqp_connection_close_t*)(res.reply.decoded))->reply_text;

					return string((char*)(msg.bytes), (char*)(msg.bytes) + msg.len);
				}

				return "RESPONSE_SERVER_EXCEPTION";
			
			default: return "UNKNOWN ERROR";
		}
	}

public:
	void close()
	{
		if (sock)
		{
			amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
			amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
			amqp_destroy_connection(conn);
			sock = NULL;
		}
	}
	bool connect(const string& host, int port, bool ssl = false)
	{
		close();
		
		conn = amqp_new_connection();
		sock = ssl ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
		
		if (sock == NULL)
		{
			amqp_destroy_connection(conn);

			return false;
		}
		
		if (amqp_socket_open(sock, host.c_str(), port))
		{
			amqp_destroy_connection(conn);

			sock = NULL;

			return false;
		}
		
		return true;
	}
	int send(const string& exchange, const string& qname, const string& data)
	{
		amqp_basic_properties_t props;

		props.delivery_mode = 2;
		props.content_type = amqp_cstring_bytes("text/plain");
		props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;

		return amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(qname.c_str()), 0, 0, &props, amqp_cstring_bytes(data.c_str()));
	}
	bool login(const string& usr, const string& pwd, amqp_sasl_method_enum method = AMQP_SASL_METHOD_PLAIN)
	{
		res = amqp_login(conn, "/", 0, 128 * 1024, 0, method, usr.c_str(), pwd.c_str());

		CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);

		amqp_channel_open(conn, 1);

		return getResponse();
	}
	bool recv(const string& exchange, const string& qname, std::function<void(const char*, int)> func, int timeout = 5, bool aotodelete = false)
	{
		timeval tv;
		amqp_bytes_t mq = amqp_cstring_bytes(qname.c_str());
		
		amqp_queue_declare(conn, 1, mq, 0, 1, 0, aotodelete ? 1 : 0, amqp_empty_table);

		CHECK_FALSE_RETURN(getResponse());

		amqp_queue_bind(conn, 1, mq, amqp_cstring_bytes(exchange.c_str()), mq, amqp_empty_table);

		CHECK_FALSE_RETURN(getResponse());

		amqp_basic_consume(conn, 1, mq, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

		CHECK_FALSE_RETURN(getResponse());

		tv.tv_sec = timeout;
		tv.tv_usec = 0;
	
		while (true)
		{
			amqp_envelope_t data;

			amqp_maybe_release_buffers(conn);

			res = amqp_consume_message(conn, &data, &tv, 0);
	 
			CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);

			func((char*)(data.message.body.bytes), data.message.body.len);

			amqp_destroy_envelope(&data);
		}

		return false;
	}
};

////////////////////////////////////////////////////////////////
#endif
